package com.zhang.hadoop.flink.test6;

import com.zhang.hadoop.flink.base.ClickSource;
import com.zhang.hadoop.flink.base.Event;
import com.zhang.hadoop.flink.test5.UrlViewCount;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;

/**
 * @author: zhang yufei
 * @createTime:2022/7/10 10:30
 * @description:
 */
public class TopNExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                );

        //1.按照url分组，统计窗口内每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))  //定义滑动窗口
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        urlCountStream.print("url count");

        //2.对于同一窗口统计出的访问量，进行收集好排序
        urlCountStream.keyBy(data -> data.windowEnd)
                .process(new TopNProcessResult(3))
                .print();

        env.execute();
    }

    //实现自定义的KeyedProcessFunction
    public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String> {

        //定义一个属性，n
        private Integer n;

        //定义列表状态
        private ListState<UrlViewCount> urlViewCountListState;

        public TopNProcessResult(Integer n) {
            this.n = n;
        }

        //在环境中获取状态

        @Override
        public void open(Configuration parameters) throws Exception {
            urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)));
        }

        @Override
        public void processElement(UrlViewCount value, Context context, Collector<String> collector) throws Exception {
            //将数据保存到状态中
            urlViewCountListState.add(value);
            //注册windowEnd+1ms的定时器
            context.timerService().registerEventTimeTimer(context.getCurrentKey()+1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
            for(UrlViewCount urlViewCount:urlViewCountListState.get()){
                urlViewCountArrayList.add(urlViewCount);
            }
            //排序
            urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                    return (int) (o2.count - o1.count);
                }
            });
            //包装信息打印输出
            StringBuilder result = new StringBuilder();
            result.append("----\n");
            result.append("窗口结束时间：" + LocalDateTime.ofInstant(Instant.ofEpochMilli(ctx.getCurrentKey()), ZoneId.systemDefault()) + "\n");
            //取list前两个，包装信息输出
            for (int i = 0; i < n; i++) {
                UrlViewCount urlViewCount = urlViewCountArrayList.get(i);
                result.append("No." + (i + 1) + " ");
                result.append("url:" + urlViewCount.url + " ");
                result.append("访问量：" + urlViewCount.count + "\n");
            }
            result.append("----\n");
            out.collect(result.toString());
        }
    }

    //增量聚合，来一条数据就加1
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event event, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long aLong, Long accumulator) {
            return null;
        }
    }

    //包装窗口信息，输出UrlViewCount
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {

        @Override
        public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> collector) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long count = iterable.iterator().next();
            collector.collect(new UrlViewCount(url, count, start, end));
        }
    }
}
